package flinkstudy.batch.sink;

import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;
import org.elasticsearch.client.transport.TransportClient;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * 导出到es
 *
 * @author daocr
 * @date 2020/7/21
 */
public class MyElasticsearchSink {


    public static void main(String[] args) {

        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();

        DataSource<Tuple2<Integer, String>> dataSource = executionEnvironment.fromElements(new Tuple2(1, "张三"),
                new Tuple2(2, "李四"), new Tuple2(3, "李四")
                , new Tuple2(4, "赵六"), new Tuple2(5, "赵六"), new Tuple2(6, "赵六"));


        List<HttpHost> httpHosts = new ArrayList<>();
        httpHosts.add(new HttpHost("127.0.0.1", 9200, "http"));


        dataSource.output(new OutputFormat<Tuple2<Integer, String>>() {
            @Override
            public void configure(Configuration parameters) {

            }

            @Override
            public void open(int taskNumber, int numTasks) throws IOException {

            }

            @Override
            public void writeRecord(Tuple2<Integer, String> record) throws IOException {

            }

            @Override
            public void close() throws IOException {

            }
        })

//        esSinkBuilder.setRestClientFactory(
//                restClientBuilder -> {
//                    restClientBuilder.setDefaultHeaders()
//                    restClientBuilder.setMaxRetryTimeoutMillis(...)
//                    restClientBuilder.setPathPrefix(...)
//                    restClientBuilder.setHttpClientConfigCallback(...)
//                }
//        );
    }

}
